package com.ivyft.hive.serde2.protobuf;

import com.google.protobuf.Descriptors;
import com.google.protobuf.GeneratedMessage;
import com.ivyft.hive.hadoop.IntLengthHeaderInputFormat;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;

import java.lang.reflect.Method;
import java.util.*;

/**
 * <pre>
 *
 * Created by IntelliJ IDEA.
 * User: zhenqin
 * Date: 15/11/7
 * Time: 21:22
 * To change this template use File | Settings | File Templates.
 *
 * </pre>
 *
 * @author zhenqin
 */
public class ProtobufSerde extends AbstractSerDe {


    // params
    private List<String> columnNames = null;


    protected List<String> copyColumnNames = null;


    private List<TypeInfo> columnTypes = null;



    private ObjectInspector objectInspector = null;



    protected Class<? extends GeneratedMessage> objectClass;



    protected final Map<String, Descriptors.FieldDescriptor> fieldDescriptorMap = new HashMap<String, Descriptors.FieldDescriptor>();




    protected Method parseMethod;


    protected static Log LOG = LogFactory.getLog(ProtobufSerde.class);


    public ProtobufSerde() {
    }


    @Override
    public void initialize(Configuration configuration, Properties properties) throws SerDeException {
        LOG.info("serde initialize");

        LOG.info("start properties output: ");
        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
            LOG.info(entry.getKey() + "\t" + entry.getValue());
        }
        LOG.info("end properties output.");

        String clazz = properties.getProperty(IntLengthHeaderInputFormat.PROTOBUF_CLASS);
        if(StringUtils.isBlank(clazz)) {
            clazz = properties.getProperty("protobuf.output.clazz");
        }

        if(StringUtils.isBlank(clazz)) {
            clazz = configuration.get(IntLengthHeaderInputFormat.PROTOBUF_CLASS);
        }

        if(StringUtils.isBlank(clazz)) {
            throw new IllegalArgumentException("can not reserve protobuf object class, set conf '"
                    + IntLengthHeaderInputFormat.PROTOBUF_CLASS+"' or 'protobuf.output.clazz' property.");
        }

        try {
            this.objectClass = (Class<? extends GeneratedMessage>) Class.forName(clazz);
        } catch (ClassNotFoundException e) {
            throw new IllegalArgumentException("can not reserve protobuf object class: " + clazz, e);
        }

        LOG.info("reserve protobuf class: " + this.objectClass.getName());

        try {
            parseMethod = objectClass.getMethod("parseFrom", byte[].class);
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("can not reserve protobuf object class: " + clazz, e);
        }

        // Read Column Names
        String columnNameProp = properties.getProperty(Constants.LIST_COLUMNS);

        if (columnNameProp != null && columnNameProp.length() > 0) {
            String[] splits = columnNameProp.split(",");
            columnNames = new ArrayList<String>(splits.length);
            copyColumnNames = new ArrayList<String>(splits.length);
            for (String split : splits) {
                split = StringUtils.trim(split);
                columnNames.add(split);
                copyColumnNames.add(StringUtils.lowerCase(split));
            }
        } else {
            columnNames = new ArrayList<String>();
            copyColumnNames = new ArrayList<String>();
        }

        LOG.info("column: " + columnNames);


        // Read Column Types
        String columnTypeProp = properties.getProperty(Constants.LIST_COLUMN_TYPES);

        // default all string
        if (columnTypeProp == null) {
            String[] types = new String[columnNames.size()];
            Arrays.fill(types, 0, types.length, Constants.STRING_TYPE_NAME);

            columnTypeProp = StringUtils.join(types, ":");
        }


        columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProp);
        // Check column and types equals
        if (columnTypes.size() != columnNames.size()) {
            throw new SerDeException("len(columnNames) != len(columntTypes)");
        }

        LOG.info("column type: " + columnTypes);

        // Create ObjectInspectors from the type information for each column
        List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>(columnNames.size());

        ObjectInspector oi;
        for (int c = 0; c < columnNames.size(); c++) {
            oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(columnTypes.get(c));
            columnOIs.add(oi);
        }
        objectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs);


    }


    public GeneratedMessage reserveObject(byte[] bytes) {
        // 检查一下Netty Protobuf解码器是怎么写的
        try {
            return (GeneratedMessage) parseMethod.invoke(null, bytes);
        } catch (Exception e) {
            throw new IllegalStateException("reflection error!", e);
        }
    }


    @Override
    public Class<? extends Writable> getSerializedClass() {
        return BytesWritable.class;
    }

    @Override
    public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
        throw new UnsupportedOperationException();
    }

    @Override
    public SerDeStats getSerDeStats() {
        return null;
    }

    @Override
    public Object deserialize(Writable wr) throws SerDeException {
        if(wr == null) {
            return null;
        }

        if (wr instanceof BytesWritable) {
            BytesWritable value = (BytesWritable) wr;
            GeneratedMessage reserveObject = reserveObject(value.getBytes());

            if(fieldDescriptorMap.isEmpty()) {
                List<Descriptors.FieldDescriptor> fields = reserveObject.getDescriptorForType().getFields();
                for (Descriptors.FieldDescriptor field : fields) {
                    fieldDescriptorMap.put(StringUtils.lowerCase(field.getName()), field);
                }
            }

            ArrayList<Object> row = new ArrayList<Object>(columnNames.size());

            for (String columnName : copyColumnNames) {
                Descriptors.FieldDescriptor field = fieldDescriptorMap.get(columnName);
                Object v = reserveObject.getField(field);

                row.add(getStructVal(field, v));
            }
            return row;

        }
        return null;
    }




    public Object getStructVal(Descriptors.FieldDescriptor field, Object v) {
        Object value = null;
        if(v == null) {
            return null;
        }

        if(field.isRepeated()) {
            value = v;
        } else {
            if (field.getJavaType() == Descriptors.FieldDescriptor.JavaType.ENUM) {
                Descriptors.EnumValueDescriptor val = (Descriptors.EnumValueDescriptor) v;
                String enumV = String.valueOf(val.toProto().getName());

                if(StringUtils.isBlank(enumV)) {
                    String s = String.valueOf(field.getDefaultValue());
                    value = "null".equals(s) ? null : s;
                } else {
                    value = enumV;
                }
            } else if(field.getJavaType() == Descriptors.FieldDescriptor.JavaType.STRING) {
                String val = (String) v;

                if(StringUtils.isBlank(val)) {
                    String s = (String)field.getDefaultValue();
                    value = "".equals(s) || "null".equals(s) ? null : s;
                } else {
                    value = val;
                }
            } else if(field.getJavaType() == Descriptors.FieldDescriptor.JavaType.MESSAGE) {
                GeneratedMessage innerObject = (GeneratedMessage) v;
                List<Descriptors.FieldDescriptor> innerFields = innerObject.getDescriptorForType().getFields();
                Map<String, Object> result = new HashMap<String, Object>();
                for (Descriptors.FieldDescriptor innerField : innerFields) {
                    Object innerV = innerObject.getField(innerField);
                    Object val = getStructVal(innerField, innerV);
                    if(val != null) {
                        result.put(innerField.getName(), val);
                    }
                }

                value = result;
            } else {
                if(!field.hasDefaultValue()) {
                    value = v;
                } else {
                    value = field.getDefaultValue();
                }

            }

        }

        return value;
    }


    @Override
    public ObjectInspector getObjectInspector() throws SerDeException {
        return this.objectInspector;
    }
}
